{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Invoke a SageMaker Endpoint from Kinesis\n",
    "We will create an AWS Lambda function that invokes a SageMaker Endpoint to predict the `star_rating` on our incoming streaming data (reviews). We can use that Lambda function to transform our data in the Amazon Kinesis Data Firehose delivery stream, and to pre-process the streaming data in Kinesis DataAnalytics."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _Transform Data in Kinesis Data Firehose delivery stream_\n",
    "<img src=\"img/kinesis_firehose_transform.png\" width=\"90%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _Preprocess streaming data in Kinesis Data Analytics_\n",
    "<img src=\"img/kinesis-analytics-transformed_data.png\" width=\"90%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "import json\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)\n",
    "firehose = boto3.Session().client(service_name=\"firehose\", region_name=region)\n",
    "lam = boto3.Session().client(service_name=\"lambda\", region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check IAM Roles Are In Place"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_lambda_role_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_lambda_role_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_lambda_role_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_lambda_role_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_lambda_role_passed\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_lambda_role_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not iam_lambda_role_passed:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_lambda_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_role_lambda_arn\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_role_lambda_arn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Review Lambda Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lambda_fn_name_invoke_ep = \"InvokeSageMakerEndpointFromKinesis\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store lambda_fn_name_invoke_ep"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!pygmentize src/invoke_sm_endpoint_from_kinesis.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Test the PyTorch Endpoint Similar to How the Lambda Invokes the Endpoint"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r pytorch_endpoint_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    pytorch_endpoint_name\n",
    "    print(\"[OK]\")\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "print(pytorch_endpoint_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    waiter = sm.get_waiter(\"endpoint_in_service\")\n",
    "    waiter.wait(EndpointName=pytorch_endpoint_name)\n",
    "except:\n",
    "    print(\"###################\")\n",
    "    print(\"The endpoint is not running.\")\n",
    "    print(\"Please re-run the model deployment section to deploy the endpoint.\")\n",
    "    print(\"###################\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "inputs = [\n",
    "    {\"features\": [\"I love this product!\"]},\n",
    "    {\"features\": [\"OK, but not great.\"]},\n",
    "    {\"features\": [\"This is not the right product.\"]},\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.predictor import Predictor\n",
    "from sagemaker.serializers import JSONLinesSerializer\n",
    "from sagemaker.deserializers import JSONLinesDeserializer\n",
    "\n",
    "predictor = Predictor(\n",
    "    endpoint_name=pytorch_endpoint_name,\n",
    "    serializer=JSONLinesSerializer(),\n",
    "    deserializer=JSONLinesDeserializer(),\n",
    "    sagemaker_session=sess\n",
    ")\n",
    "\n",
    "predicted_classes = predictor.predict(inputs)\n",
    "\n",
    "for predicted_class in predicted_classes:\n",
    "    print(\"Predicted class: {}\".format(predicted_class))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a .zip file for the Python dependencies (Lambda Layer)\n",
    "This requires us to create a directory called `python` for Python environments.\n",
    "\n",
    "_Note:  This may take 5-10 minutes.  Please be patient._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!rm -rf layer/python\n",
    "!mkdir -p layer/python\n",
    "!pip install -q --target layer/python sagemaker==2.38.0\n",
    "!cd layer && zip -q --recurse-paths layer.zip ."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### _Please ignore ERROR's and WARNING's ^^ above ^^._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(\"layer/layer.zip\", \"rb\") as f:\n",
    "    layer = f.read()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from botocore.exceptions import ClientError\n",
    "\n",
    "sagemaker_lambda_layer_name = 'sagemaker-python-sdk-layer'\n",
    "layer_response = lam.publish_layer_version(\n",
    "    LayerName=sagemaker_lambda_layer_name,\n",
    "    Content={\"ZipFile\": layer},\n",
    "    Description=\"Layer with 'pip install sagemaker'\",\n",
    "    CompatibleRuntimes=['python3.9']\n",
    ")\n",
    "\n",
    "layer_version_arn = layer_response['LayerVersionArn']\n",
    "\n",
    "print(\"Lambda layer {} successfully created with LayerVersionArn {}.\".format(sagemaker_lambda_layer_name, layer_version_arn))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a .zip file for our Python code (Lambda Function)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!zip src/InvokeSageMakerEndpointFromKinesis.zip src/invoke_sm_endpoint_from_kinesis.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(\"src/InvokeSageMakerEndpointFromKinesis.zip\", \"rb\") as f:\n",
    "    code = f.read()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create The Lambda Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from botocore.exceptions import ClientError\n",
    "\n",
    "try:\n",
    "    response = lam.create_function(\n",
    "        FunctionName=\"{}\".format(lambda_fn_name_invoke_ep),\n",
    "        Runtime=\"python3.9\",\n",
    "        Role=\"{}\".format(iam_role_lambda_arn),\n",
    "        Handler=\"src/invoke_sm_endpoint_from_kinesis.lambda_handler\",\n",
    "        Code={\"ZipFile\": code},\n",
    "        Layers=[\n",
    "            layer_version_arn\n",
    "        ],\n",
    "        Description=\"Query SageMaker Endpoint for star rating prediction on review input text.\",\n",
    "        # max timeout supported by Firehose is 5min\n",
    "        Timeout=300,\n",
    "        MemorySize=128,\n",
    "        Publish=True,\n",
    "    )\n",
    "    print(\"Lambda Function {} successfully created.\".format(lambda_fn_name_invoke_ep))\n",
    "except ClientError as e:\n",
    "    if e.response[\"Error\"][\"Code\"] == \"ResourceConflictException\":\n",
    "        response = lam.update_function_code(\n",
    "            FunctionName=\"{}\".format(lambda_fn_name_invoke_ep), ZipFile=code, Publish=True, DryRun=False\n",
    "        )\n",
    "        print(\"Updating existing Lambda Function {}.  This is OK.\".format(lambda_fn_name_invoke_ep))\n",
    "    else:\n",
    "        print(\"Error: {}\".format(e))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = lam.get_function(FunctionName=lambda_fn_name_invoke_ep)\n",
    "\n",
    "lambda_fn_arn_invoke_ep = response[\"Configuration\"][\"FunctionArn\"]\n",
    "print(lambda_fn_arn_invoke_ep)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store lambda_fn_arn_invoke_ep"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Review Lambda Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/lambda/home?region={}#/functions/{}\"> Lambda Function</a></b>'.format(\n",
    "            region, lambda_fn_name_invoke_ep\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Configure Lambda With Endpoint"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = lam.update_function_configuration(\n",
    "    FunctionName=lambda_fn_name_invoke_ep, Environment={\"Variables\": {\"ENDPOINT_NAME\": pytorch_endpoint_name}}\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Store Variables for Next Notebooks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
